package net.bwie.realtime.jtp.dwd.trade.function;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import net.bwie.realtime.jtp.dwd.trade.util.HbaseUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;

import java.util.Map;

/**
 * @Author: FuHe
 * @Date: 2025/6/4
 */
public class LoadProvinceDimMapFunction extends RichMapFunction<String, String> {
    /**
     * 定义map的结果，key为关联字段
     */
    private Map<String, String> cacheMap;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 获取hbase表中省份维度数据，存储到map集合中
        cacheMap = HbaseUtil.scanData("dim_base_province", "info", "name");
    }

    @Override
    public String map(String value) throws Exception {
        // 解析json
        JSONObject jsonObject = JSON.parseObject(value);
        // 获取关联字段值
        String provinceId = jsonObject.getString("province_id");
        // 从map集合获取对应value值
        String provinceName = cacheMap.getOrDefault(provinceId, "未知");
        // 添加字段
        jsonObject.put("province_name", provinceName);
        return jsonObject.toJSONString();
    }

    @Override
    public void close() throws Exception {
        super.close();
    }
}
